package com.sisyphus.wordcount

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

/**
 * Title: 表
 * Description: stream表
 * Author sweetdream
 * Date 2020/12/11
 */
object TableStream {
  def main(args: Array[String]): Unit = {
    // 1. env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    // 2.source
    val input = env.readTextFile(getClass.getResource("/wordcount").getPath)

    // 3. transform
    //先转换成样例类类型
    val data = input
      .flatMap(_.split(" "))
      .map(WC(_, 1))

    // 4. table
    // 创建一张表
    val dataTable: Table = tableEnv.fromDataStream(data)
    dataTable.printSchema()

    // 第一种: 调用table api进行转换
    val res = dataTable
      .groupBy('word)
      .select('word, 'num.count as 'cnt)
      .toRetractStream[Row]
      .print()

    // 第二种: sql实现
    // 注册一张临时表
    tableEnv.createTemporaryView("table1", dataTable)

    tableEnv.sqlQuery("select word,num from table1")
      .toAppendStream[Row]
      .print()

    tableEnv.sqlQuery("select word,count(num) as total from table1 group by word")
      .toRetractStream[Row]
      .print()

    env.execute("table stream")
  }
}
